PySpark Cheat Sheet
Source : github Pyspark-cheatsheet
This cheat sheet will help you learn PySpark and write PySpark apps faster. Everything in here is fully functional PySpark code you can run or adapt to your programs.
These snippets are licensed under the CC0 1.0 Universal License. That means you can freely copy and adapt these code snippets and you don't need to give attribution or include any notices.
These snippets use DataFrames loaded from various data sources:
- "Auto MPG Data Set" available from the UCI Machine Learning Repository.
- customer_spend.csv, a generated time series dataset.
- date_examples.csv, a generated dataset with various date and time formats.
- weblog.csv, a cleaned version of this web log dataset.
These snippets were tested against the Spark 3.2.2 API. This page was last updated 2022-09-19 15:31:03.
Make note of these helpful links:
- PySpark DataFrame Operations
- Built-in Spark SQL Functions
- MLlib Main Guide
- Structured Streaming Guide
- PySpark SQL Functions Source
Generate the Cheatsheet
You can generate the cheatsheet by running cheatsheet.py
in your PySpark environment as follows:
- Install dependencies:
pip3 install -r requirements.txt
- Generate README.md:
python3 cheatsheet.py
- Generate cheatsheet.ipynb:
python3 cheatsheet.py --notebook
Table of contents
- Accessing Data Sources
- Load a DataFrame from CSV
- Load a DataFrame from a Tab Separated Value (TSV) file
- Save a DataFrame in CSV format
- Load a DataFrame from Parquet
- Save a DataFrame in Parquet format
- Load a DataFrame from JSON Lines (jsonl) Formatted Data
- Save a DataFrame into a Hive catalog table
- Load a Hive catalog table into a DataFrame
- Load a DataFrame from a SQL query
- Load a CSV file from Amazon S3
- Load a CSV file from Oracle Cloud Infrastructure (OCI) Object Storage
- Read an Oracle DB table into a DataFrame using a Wallet
- Write a DataFrame to an Oracle DB table using a Wallet
- Write a DataFrame to a Postgres table
- Read a Postgres table into a DataFrame
- Data Handling Options
- Provide the schema when loading a DataFrame from CSV
- Save a DataFrame to CSV, overwriting existing data
- Save a DataFrame to CSV with a header
- Save a DataFrame in a single CSV file
- Save DataFrame as a dynamic partitioned table
- Overwrite specific partitions
- Load a CSV file with a money column into a DataFrame
- DataFrame Operations
- Add a new column to a DataFrame
- Modify a DataFrame column
- Add a column with multiple conditions
- Add a constant column
- Concatenate columns
- Drop a column
- Change a column name
- Change multiple column names
- Change all column names at once
- Convert a DataFrame column to a Python list
- Convert a scalar query to a Python value
- Consume a DataFrame row-wise as Python dictionaries
- Select particular columns from a DataFrame
- Create an empty dataframe with a specified schema
- Create a constant dataframe
- Convert String to Double
- Convert String to Integer
- Get the size of a DataFrame
- Get a DataFrame's number of partitions
- Get data types of a DataFrame's columns
- Convert an RDD to Data Frame
- Print the contents of an RDD
- Print the contents of a DataFrame
- Process each row of a DataFrame
- DataFrame Map example
- DataFrame Flatmap example
- Create a custom UDF
- Transforming Data
- Sorting and Searching
- Filter a column using a condition
- Filter based on a specific column value
- Filter based on an IN list
- Filter based on a NOT IN list
- Filter values based on keys in another DataFrame
- Get Dataframe rows that match a substring
- Filter a Dataframe based on a custom substring search
- Filter based on a column's length
- Multiple filter conditions
- Sort DataFrame by a column
- Take the first N rows of a DataFrame
- Get distinct values of a column
- Remove duplicates
- Grouping
- count(*) on a particular column
- Group and sort
- Filter groups based on an aggregate value, equivalent to SQL HAVING clause
- Group by multiple columns
- Aggregate multiple columns
- Aggregate multiple columns with custom orderings
- Get the maximum of a column
- Sum a list of columns
- Sum a column
- Aggregate all numeric columns
- Count unique after grouping
- Count distinct values on all columns
- Group by then filter on the count
- Find the top N per row group (use N=1 for maximum)
- Group key/values into a list
- Compute a histogram
- Compute global percentiles
- Compute percentiles within a partition
- Compute percentiles after aggregating
- Filter rows with values below a target percentile
- Aggregate and rollup
- Aggregate and cube
- Joining DataFrames
- File Processing
- Handling Missing Data
- Dealing with Dates
- Unstructured Analytics
- Pandas
- Convert Spark DataFrame to Pandas DataFrame
- Convert Pandas DataFrame to Spark DataFrame with Schema Detection
- Convert Pandas DataFrame to Spark DataFrame using a Custom Schema
- Convert N rows from a DataFrame to a Pandas DataFrame
- Grouped Aggregation with Pandas
- Use a Pandas Grouped Map Function via applyInPandas
- Data Profiling
- Data Management
- Save to a Delta Table
- Update records in a DataFrame using Delta Tables
- Merge into a Delta table
- Show Table Version History
- Load a Delta Table by Version ID (Time Travel Query)
- Load a Delta Table by Timestamp (Time Travel Query)
- Compact a Delta Table
- Add custom metadata to a Delta table write
- Read custom Delta table metadata
- Spark Streaming
- Connect to Kafka using SASL PLAIN authentication
- Create a windowed Structured Stream over input CSV files
- Create an unwindowed Structured Stream over input CSV files
- Add the current timestamp to a DataFrame
- Session analytics on a DataFrame
- Call a UDF only when a threshold is reached
- Streaming Machine Learning
- Control stream processing frequency
- Write a streaming DataFrame to a database
- Time Series
- Machine Learning
- Prepare data for training with a VectorAssembler
- A basic Random Forest Regression model
- Hyperparameter tuning
- Encode string variables as numbers
- One-hot encode a categorical variable
- Optimize a model after a data preparation pipeline
- Evaluate Model Performance
- Get feature importances of a trained model
- Plot Hyperparameter tuning metrics
- Compute correlation matrix
- Save a model
- Load a model and use it for transformations
- Load a model and use it for predictions
- Load a classification model and use it to compute confidences for output labels
- Performance _ Get the Spark version _ Log messages using Spark's Log4J _ Cache a DataFrame _ Show the execution plan, with costs _ Partition by a Column Value _ Range Partition a DataFrame _ Change Number of DataFrame Partitions _ Coalesce DataFrame partitions _ Set the number of shuffle partitions _ Sample a subset of a DataFrame _ Run multiple concurrent jobs in different pools _ Print Spark configuration properties _ Set Spark configuration properties _ Publish Metrics to Graphite * Increase Spark driver/executor heap space
Accessing Data Sources
Loading data stored in filesystems or databases, and saving it.
Load a DataFrame from CSV
See https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameReader.html for a list of supported options.
df = spark.read.format("csv").option("header", True).load("data/auto-mpg.csv")
# Code snippet result:
+----+---------+------------+----------+------+------------+---------+------+----------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|modelyear|origin| carname|
+----+---------+------------+----------+------+------------+---------+------+----------+
|18.0| 8| 307.0| 130.0| 3504.| 12.0| 70| 1|chevrol...|
|15.0| 8| 350.0| 165.0| 3693.| 11.5| 70| 1|buick s...|
|18.0| 8| 318.0| 150.0| 3436.| 11.0| 70| 1|plymout...|
|16.0| 8| 304.0| 150.0| 3433.| 12.0| 70| 1|amc reb...|
|17.0| 8| 302.0| 140.0| 3449.| 10.5| 70| 1|ford to...|
|15.0| 8| 429.0| 198.0| 4341.| 10.0| 70| 1|ford ga...|
|14.0| 8| 454.0| 220.0| 4354.| 9.0| 70| 1|chevrol...|
|14.0| 8| 440.0| 215.0| 4312.| 8.5| 70| 1|plymout...|
|14.0| 8| 455.0| 225.0| 4425.| 10.0| 70| 1|pontiac...|
|15.0| 8| 390.0| 190.0| 3850.| 8.5| 70| 1|amc amb...|
+----+---------+------------+----------+------+------------+---------+------+----------+
only showing top 10 rows
Load a DataFrame from a Tab Separated Value (TSV) file
See https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameReader.html for a list of supported options.
df = (
spark.read.format("csv")
.option("header", True)
.option("sep", "\t")
.load("data/auto-mpg.tsv")
)
# Code snippet result:
+----+---------+------------+----------+------+------------+---------+------+----------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|modelyear|origin| carname|
+----+---------+------------+----------+------+------------+---------+------+----------+
|18.0| 8| 307.0| 130.0| 3504.| 12.0| 70| 1|chevrol...|
|15.0| 8| 350.0| 165.0| 3693.| 11.5| 70| 1|buick s...|
|18.0| 8| 318.0| 150.0| 3436.| 11.0| 70| 1|plymout...|
|16.0| 8| 304.0| 150.0| 3433.| 12.0| 70| 1|amc reb...|
|17.0| 8| 302.0| 140.0| 3449.| 10.5| 70| 1|ford to...|
|15.0| 8| 429.0| 198.0| 4341.| 10.0| 70| 1|ford ga...|
|14.0| 8| 454.0| 220.0| 4354.| 9.0| 70| 1|chevrol...|
|14.0| 8| 440.0| 215.0| 4312.| 8.5| 70| 1|plymout...|
|14.0| 8| 455.0| 225.0| 4425.| 10.0| 70| 1|pontiac...|
|15.0| 8| 390.0| 190.0| 3850.| 8.5| 70| 1|amc amb...|
+----+---------+------------+----------+------+------------+---------+------+----------+
only showing top 10 rows
Save a DataFrame in CSV format
See https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameWriter.html for a list of supported options.
auto_df.write.csv("output.csv")
Load a DataFrame from Parquet
df = spark.read.format("parquet").load("data/auto-mpg.parquet")
# Code snippet result:
+----+---------+------------+----------+------+------------+---------+------+----------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|modelyear|origin| carname|
+----+---------+------------+----------+------+------------+---------+------+----------+
|18.0| 8| 307.0| 130.0| 3504.| 12.0| 70| 1|chevrol...|
|15.0| 8| 350.0| 165.0| 3693.| 11.5| 70| 1|buick s...|
|18.0| 8| 318.0| 150.0| 3436.| 11.0| 70| 1|plymout...|
|16.0| 8| 304.0| 150.0| 3433.| 12.0| 70| 1|amc reb...|
|17.0| 8| 302.0| 140.0| 3449.| 10.5| 70| 1|ford to...|
|15.0| 8| 429.0| 198.0| 4341.| 10.0| 70| 1|ford ga...|
|14.0| 8| 454.0| 220.0| 4354.| 9.0| 70| 1|chevrol...|
|14.0| 8| 440.0| 215.0| 4312.| 8.5| 70| 1|plymout...|
|14.0| 8| 455.0| 225.0| 4425.| 10.0| 70| 1|pontiac...|
|15.0| 8| 390.0| 190.0| 3850.| 8.5| 70| 1|amc amb...|
+----+---------+------------+----------+------+------------+---------+------+----------+
only showing top 10 rows
Save a DataFrame in Parquet format
auto_df.write.parquet("output.parquet")
Load a DataFrame from JSON Lines (jsonl) Formatted Data
JSON Lines / jsonl format uses one JSON document per line. If you have data with mostly regular structure this is better than nesting it in an array. See jsonlines.org
df = spark.read.json("data/weblog.jsonl")
# Code snippet result:
+----------+----------+--------+----------+----------+------+
| client| country| session| timestamp| uri| user|
+----------+----------+--------+----------+----------+------+
|{false,...|Bangladesh|55fa8213| 869196249|http://...|dde312|
|{true, ...| Niue|2fcd4a83|1031238717|http://...|9d00b9|
|{true, ...| Rwanda|013b996e| 628683372|http://...|1339d4|
|{false,...| Austria|07e8a71a|1043628668|https:/...|966312|
|{false,...| Belize|b23d05d8| 192738669|http://...|2af1e1|
|{false,...|Lao Peo...|d83dfbae|1066490444|http://...|844395|
|{false,...|French ...|e77dfaa2|1350920869|https:/...| null|
|{false,...|Turks a...|56664269| 280986223|http://...| null|
|{false,...| Ethiopia|628d6059| 881914195|https:/...|8ab45a|
|{false,...|Saint K...|85f9120c|1065114708|https:/...| null|
+----------+----------+--------+----------+----------+------+
only showing top 10 rows
Save a DataFrame into a Hive catalog table
Save a DataFrame to a Hive-compatible catalog. Use table
to save in the session's current database or database.table
to save
in a specific database.
auto_df.write.mode("overwrite").saveAsTable("autompg")
Load a Hive catalog table into a DataFrame
Load a DataFrame from a particular table. Use table
to load from the session's current database or database.table
to load from a specific database.
df = spark.table("autompg")
# Code snippet result:
+----+---------+------------+----------+------+------------+---------+------+----------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|modelyear|origin| carname|
+----+---------+------------+----------+------+------------+---------+------+----------+
|18.0| 8| 307.0| 130.0| 3504.| 12.0| 70| 1|chevrol...|
|15.0| 8| 350.0| 165.0| 3693.| 11.5| 70| 1|buick s...|
|18.0| 8| 318.0| 150.0| 3436.| 11.0| 70| 1|plymout...|
|16.0| 8| 304.0| 150.0| 3433.| 12.0| 70| 1|amc reb...|
|17.0| 8| 302.0| 140.0| 3449.| 10.5| 70| 1|ford to...|
|15.0| 8| 429.0| 198.0| 4341.| 10.0| 70| 1|ford ga...|
|14.0| 8| 454.0| 220.0| 4354.| 9.0| 70| 1|chevrol...|
|14.0| 8| 440.0| 215.0| 4312.| 8.5| 70| 1|plymout...|
|14.0| 8| 455.0| 225.0| 4425.| 10.0| 70| 1|pontiac...|
|15.0| 8| 390.0| 190.0| 3850.| 8.5| 70| 1|amc amb...|
+----+---------+------------+----------+------+------------+---------+------+----------+
only showing top 10 rows
Load a DataFrame from a SQL query
This example shows loading a DataFrame from a query run over the a table in a Hive-compatible catalog.
df = sqlContext.sql(
"select carname, mpg, horsepower from autompg where horsepower > 100 and mpg > 25"
)
# Code snippet result:
+----------+----+----------+
| carname| mpg|horsepower|
+----------+----+----------+
| bmw 2002|26.0| 113.0|
|chevrol...|28.8| 115.0|
|oldsmob...|26.8| 115.0|
|dodge colt|27.9| 105.0|
|datsun ...|32.7| 132.0|
|oldsmob...|26.6| 105.0|
+----------+----+----------+
Load a CSV file from Amazon S3
This example shows how to load a CSV file from AWS S3. This example uses a credential pair and the SimpleAWSCredentialsProvider
. For other authentication options, refer to the Hadoop-AWS module documentation.
import configparser
import os
config = configparser.ConfigParser()
config.read(os.path.expanduser("~/.aws/credentials"))
access_key = config.get("default", "aws_access_key_id").replace('"', "")
secret_key = config.get("default", "aws_secret_access_key").replace('"', "")
# Requires compatible hadoop-aws and aws-java-sdk-bundle JARs.
spark.conf.set(
"fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
)
spark.conf.set("fs.s3a.access.key", access_key)
spark.conf.set("fs.s3a.secret.key", secret_key)
df = (
spark.read.format("csv")
.option("header", True)
.load("s3a://cheatsheet111/auto-mpg.csv")
)
Load a CSV file from Oracle Cloud Infrastructure (OCI) Object Storage
This example shows loading data from Oracle Cloud Infrastructure Object Storage using an API key.
import oci
oci_config = oci.config.from_file()
conf = spark.sparkContext.getConf()
conf.set("fs.oci.client.auth.tenantId", oci_config["tenancy"])
conf.set("fs.oci.client.auth.userId", oci_config["user"])
conf.set("fs.oci.client.auth.fingerprint", oci_config["fingerprint"])
conf.set("fs.oci.client.auth.pemfilepath", oci_config["key_file"])
conf.set(
"fs.oci.client.hostname",
"https://objectstorage.{0}.oraclecloud.com".format(oci_config["region"]),
)
PATH = "oci://<your_bucket>@<your_namespace/<your_path>"
df = spark.read.format("csv").option("header", True).load(PATH)
Read an Oracle DB table into a DataFrame using a Wallet
Get the tnsname from tnsnames.ora. The wallet path should point to an extracted wallet file. The wallet files need to be available on all nodes.
password = "my_password"
table = "source_table"
tnsname = "my_tns_name"
user = "ADMIN"
wallet_path = "/path/to/your/wallet"
properties = {
"driver": "oracle.jdbc.driver.OracleDriver",
"oracle.net.tns_admin": tnsname,
"password": password,
"user": user,
}
url = f"jdbc:oracle:thin:@{tnsname}?TNS_ADMIN={wallet_path}"
df = spark.read.jdbc(url=url, table=table, properties=properties)
Write a DataFrame to an Oracle DB table using a Wallet
Get the tnsname from tnsnames.ora. The wallet path should point to an extracted wallet file. The wallet files need to be available on all nodes.
password = "my_password"
table = "target_table"
tnsname = "my_tns_name"
user = "ADMIN"
wallet_path = "/path/to/your/wallet"
properties = {
"driver": "oracle.jdbc.driver.OracleDriver",
"oracle.net.tns_admin": tnsname,
"password": password,
"user": user,
}
url = f"jdbc:oracle:thin:@{tnsname}?TNS_ADMIN={wallet_path}"
# Possible modes are "Append", "Overwrite", "Ignore", "Error"
df.write.jdbc(url=url, table=table, mode="Append", properties=properties)
Write a DataFrame to a Postgres table
You need a Postgres JDBC driver to connect to a Postgres database.
Options include:
- Add
org.postgresql:postgresql:<version>
tospark.jars.packages
- Provide the JDBC driver using
spark-submit --jars
- Add the JDBC driver to your Spark runtime (not recommended)
If you use Delta Lake there is a special procedure for specifying spark.jars.packages
, see the source code that generates this file for details.
pg_database = os.environ.get("PGDATABASE") or "postgres"
pg_host = os.environ.get("PGHOST") or "localhost"
pg_password = os.environ.get("PGPASSWORD") or "password"
pg_user = os.environ.get("PGUSER") or "postgres"
table = "autompg"
properties = {
"driver": "org.postgresql.Driver",
"user": pg_user,
"password": pg_password,
}
url = f"jdbc:postgresql://{pg_host}:5432/{pg_database}"
auto_df.write.jdbc(url=url, table=table, mode="Append", properties=properties)
Read a Postgres table into a DataFrame
You need a Postgres JDBC driver to connect to a Postgres database.
Options include:
- Add
org.postgresql:postgresql:<version>
tospark.jars.packages
- Provide the JDBC driver using
spark-submit --jars
- Add the JDBC driver to your Spark runtime (not recommended)
pg_database = os.environ.get("PGDATABASE") or "postgres"
pg_host = os.environ.get("PGHOST") or "localhost"
pg_password = os.environ.get("PGPASSWORD") or "password"
pg_user = os.environ.get("PGUSER") or "postgres"
table = "autompg"
properties = {
"driver": "org.postgresql.Driver",
"user": pg_user,
"password": pg_password,
}
url = f"jdbc:postgresql://{pg_host}:5432/{pg_database}"
df = spark.read.jdbc(url=url, table=table, properties=properties)
# Code snippet result:
+----+---------+------------+----------+------+------------+---------+------+----------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|modelyear|origin| carname|
+----+---------+------------+----------+------+------------+---------+------+----------+
|18.0| 8| 307.0| 130.0| 3504.| 12.0| 70| 1|chevrol...|
|15.0| 8| 350.0| 165.0| 3693.| 11.5| 70| 1|buick s...|
|18.0| 8| 318.0| 150.0| 3436.| 11.0| 70| 1|plymout...|
|16.0| 8| 304.0| 150.0| 3433.| 12.0| 70| 1|amc reb...|
|17.0| 8| 302.0| 140.0| 3449.| 10.5| 70| 1|ford to...|
|15.0| 8| 429.0| 198.0| 4341.| 10.0| 70| 1|ford ga...|
|14.0| 8| 454.0| 220.0| 4354.| 9.0| 70| 1|chevrol...|
|14.0| 8| 440.0| 215.0| 4312.| 8.5| 70| 1|plymout...|
|14.0| 8| 455.0| 225.0| 4425.| 10.0| 70| 1|pontiac...|
|15.0| 8| 390.0| 190.0| 3850.| 8.5| 70| 1|amc amb...|
+----+---------+------------+----------+------+------------+---------+------+----------+
only showing top 10 rows
Data Handling Options
Special data handling scenarios.
Provide the schema when loading a DataFrame from CSV
See https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/types.html for a list of types.
from pyspark.sql.types import (
DoubleType,
IntegerType,
StringType,
StructField,
StructType,
)
schema = StructType(
[
StructField("mpg", DoubleType(), True),
StructField("cylinders", IntegerType(), True),
StructField("displacement", DoubleType(), True),
StructField("horsepower", DoubleType(), True),
StructField("weight", DoubleType(), True),
StructField("acceleration", DoubleType(), True),
StructField("modelyear", IntegerType(), True),
StructField("origin", IntegerType(), True),
StructField("carname", StringType(), True),
]
)
df = (
spark.read.format("csv")
.option("header", "true")
.schema(schema)
.load("data/auto-mpg.csv")
)
# Code snippet result:
+----+---------+------------+----------+------+------------+---------+------+----------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|modelyear|origin| carname|
+----+---------+------------+----------+------+------------+---------+------+----------+
|18.0| 8| 307.0| 130.0|3504.0| 12.0| 70| 1|chevrol...|
|15.0| 8| 350.0| 165.0|3693.0| 11.5| 70| 1|buick s...|
|18.0| 8| 318.0| 150.0|3436.0| 11.0| 70| 1|plymout...|
|16.0| 8| 304.0| 150.0|3433.0| 12.0| 70| 1|amc reb...|
|17.0| 8| 302.0| 140.0|3449.0| 10.5| 70| 1|ford to...|
|15.0| 8| 429.0| 198.0|4341.0| 10.0| 70| 1|ford ga...|
|14.0| 8| 454.0| 220.0|4354.0| 9.0| 70| 1|chevrol...|
|14.0| 8| 440.0| 215.0|4312.0| 8.5| 70| 1|plymout...|
|14.0| 8| 455.0| 225.0|4425.0| 10.0| 70| 1|pontiac...|
|15.0| 8| 390.0| 190.0|3850.0| 8.5| 70| 1|amc amb...|
+----+---------+------------+----------+------+------------+---------+------+----------+
only showing top 10 rows
Save a DataFrame to CSV, overwriting existing data
auto_df.write.mode("overwrite").csv("output.csv")
Save a DataFrame to CSV with a header
See https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameWriter.html for a list of supported options.
auto_df.coalesce(1).write.csv("header.csv", header="true")
Save a DataFrame in a single CSV file
This example outputs CSV data to a single file. The file will be written in a directory called single.csv and have a random name. There is no way to change this behavior.
If you need to write to a single file with a name you choose, consider converting it to a Pandas dataframe and saving it using Pandas.
Either way all data will be collected on one node before being written so be careful not to run out of memory.
auto_df.coalesce(1).write.csv("single.csv")
Save DataFrame as a dynamic partitioned table
When you write using dynamic partitioning, the output partitions are determined bby the values of a column rather than specified in code.
The values of the partitions will appear as subdirectories and are not contained in the output files, i.e. they become "virtual columns". When you read a partition table these virtual columns will be part of the DataFrame.
Dynamic partitioning has the potential to create many small files, this will impact performance negatively. Be sure the partition columns do not have too many distinct values and limit the use of multiple virtual columns.
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
auto_df.write.mode("append").partitionBy("modelyear").saveAsTable(
"autompg_partitioned"
)
Overwrite specific partitions
Enabling dynamic partitioning lets you add or overwrite partitions based on DataFrame contents. Without dynamic partitioning the overwrite will overwrite the entire table.
With dynamic partitioning, partitions with keys in the DataFrame are overwritten, but partitions not in the DataFrame are untouched.
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
your_dataframe.write.mode("overwrite").insertInto("your_table")
Load a CSV file with a money column into a DataFrame
Spark is not that smart when it comes to parsing numbers, not allowing things like commas. If you need to load monetary amounts the safest option is to use a parsing library like money_parser
.
from pyspark.sql.functions import udf
from pyspark.sql.types import DecimalType
from decimal import Decimal
# Load the text file.
df = (
spark.read.format("csv")
.option("header", True)
.load("data/customer_spend.csv")
)
# Convert with a hardcoded custom UDF.
money_udf = udf(lambda x: Decimal(x[1:].replace(",", "")), DecimalType(8, 4))
money1 = df.withColumn("spend_dollars", money_udf(df.spend_dollars))
# Convert with the money_parser library (much safer).
from money_parser import price_str
money_convert = udf(
lambda x: Decimal(price_str(x)) if x is not None else None,
DecimalType(8, 4),
)
df = df.withColumn("spend_dollars", money_convert(df.spend_dollars))
# Code snippet result:
+----------+-----------+-------------+
| date|customer_id|spend_dollars|
+----------+-----------+-------------+
|2020-01-31| 0| 0.0700|
|2020-01-31| 1| 0.9800|
|2020-01-31| 2| 0.0600|
|2020-01-31| 3| 0.6500|
|2020-01-31| 4| 0.5700|
|2020-02-29| 0| 0.1000|
|2020-02-29| 2| 4.4000|
|2020-02-29| 3| 0.3900|
|2020-02-29| 4| 2.1300|
|2020-02-29| 5| 0.8200|
+----------+-----------+-------------+
only showing top 10 rows
DataFrame Operations
Adding, removing and modifying DataFrame columns.